3872511bfc0fadbbe02a7970ca0e891e9741e43a,cdap-app-templates/cdap-etl/cdap-etl-lib/src/main/java/co/cask/cdap/templates/etl/realtime/sources/JmsSource.java,JmsSource,poll,#Emitter#SourceState#,118

Before Change


      return currentState;
    }

    writer.emit(text);

    return new SourceState(currentState.getState());
  }

After Change


    // Try to get message from Queue
    Message message = null;

    int count = 0;
    do {
      try {
        message = consumer.receive(JMS_CONSUMER_TIMEOUT_MS);
      } catch (JMSException e) {
        LOG.warn("Exception when trying to receive message from JMS consumer: {}", CDAP_JMS_SOURCE_NAME);
      }
      if (message != null) {
        String text;
        try {
          if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            text = textMessage.getText();
            LOG.trace("Process JMS TextMessage : ", text);
          } else if (message instanceof BytesMessage) {
            BytesMessage bytesMessage = (BytesMessage) message;
            int bodyLength = (int) bytesMessage.getBodyLength();
            byte[] data = new byte[bodyLength];
            int bytesRead = bytesMessage.readBytes(data);
            if (bytesRead != bodyLength) {
              LOG.warn("Number of bytes read {} not same as expected {}", bytesRead, bodyLength);
            }
            text = new String(data).intern();
            LOG.trace("Processing JMS ByteMessage : {}", text);
          } else {
            // Different kind of messages, just get String for now
            // TODO Process different kind of JMS messages
            text = message.toString();
            LOG.trace("Processing JMS message : ", text);
          }
        } catch (JMSException e) {
          LOG.error("Unable to read text from a JMS Message.");
          continue;
        }

        writer.emit(text);
        count++;
      }
    } while (message != null && count < messagesToReceive);

    return new SourceState(currentState.getState());
  }